package com.jasongj.kafka.consumer;

import java.io.*;
import java.text.SimpleDateFormat;
import java.util.*;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class WriteBlock {
    private static final Logger logger = LogManager.getLogger(WriteBlock.class);

    static String baseFilePath = "/home/ubuntu/writeBlock/";
   // static String baseFilePath = "e:/log_dir/";
    static String topic = "";

    public static void main(String[] args) throws Exception {
        //args = new String[] { "kafka0:9092", "topic1", "group2", "consumer2" };
        //args = new String[]{"localhost:9092", "p2p", "test-consumer-group", "consumer3"};
        if (args == null || args.length != 4) {
            System.err.println(
                    "Usage:\n\tjava -jar kafka_consumer.jar ${localhost:9092} ${p2p} ${test-consumer-group} ${consumer3}");
            System.exit(1);
        }

        String bootstrap =  args[0];
        topic = args[1];
        String groupid =  args[2];
        String clientid = args[3];

        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrap);
        props.put("group.id", groupid);
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("max.poll.interval.ms", "300000");
        props.put("max.poll.records", "500");
        props.put("auto.offset.reset", "earliest");
        props.put("deserializer.encoding", "UTF-8");
        org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            records.partitions().forEach(topicPartition -> {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
                partitionRecords.forEach(record -> {
                    String temp = record.value()+"\n\r";
                    try {
                        writeFile( temp);
                    } catch (IOException e) {
                        logger.info(e.getMessage());
                    } catch (Exception e) {
                        logger.info(e.getMessage());
                    }
                    temp = "";
                });
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(lastOffset + 1)));
            });
        }
    }

    public static synchronized  void writeFile( String content) throws Exception {
        Date now = new Date();
        SimpleDateFormat dateFormatHour= new SimpleDateFormat("yyyy-MM-dd-HH");
        String  hour = dateFormatHour.format( now );
        SimpleDateFormat dateFormatMinute= new SimpleDateFormat("yyyy-MM-dd-HH-mm");
        String  minute = dateFormatMinute.format( now );
        File dir = new File(baseFilePath+topic+"/"+hour);
        File file = new File(baseFilePath+topic+"/"+hour+"/"+minute);
        //判断日期文件夹是否存在，不存在的话创建
        if (!dir.isDirectory()) {
            dir.mkdirs();
        }
        //判断文件是否存在，不存在的话创建
        if (!file.exists()) {
            file.createNewFile();
        }
        BufferedWriter out = new BufferedWriter(new OutputStreamWriter(
                    new FileOutputStream(file, true)));
        out.write(content);
        out.close();
    }
}



